{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using the Platform's Streaming API"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The platform’s Streaming API enables working with data in the platform as streams. For more information, see the [streams overview](https://www.iguazio.com/docs/latest-release/concepts/streams/)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Initialize"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import v3io.dataplane"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a dataplane client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "v3io_client = v3io.dataplane.Client()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> **Note**: You can pass to the client the `endpoint` and `access_key` parameters explicitly. The following code is equivalent to the default values:\n",
    ">\n",
    ">``` python\n",
    ">from os import getenv\n",
    ">v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081',\n",
    ">                                    access_key=getenv('V3IO_ACCESS_KEY'))\n",
    ">```\n",
    ">\n",
    "> When calling externally, you can obtain the URL of your cluster by copying the API URL of the web-APIs service (webapi) from the **Services** dashboard page. You can select between two types of URLs:\n",
    "- **HTTPS Direct** (recommended) &mdash; a URL of the format `https://<tenant IP>:<web-APIs port>`; for example, `https://default-tenant.app.mycluster.iguazio.com:8443`.\n",
    "- **HTTPS** &mdash; a URL of the format `https://webapi.<tenant IP>`; for example, `https://webapi.default-tenant.app.mycluster.iguazio.com`.\n",
    ">\n",
    "> For more information see the [Data-Service Web-API General Structure](https://www.iguazio.com/docs/latest-release/reference/api-reference/web-apis/data-service-web-api-gen-struct/) documentation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> **Number of maximum parallel connections**: Another noteworthy parameter is `max_connections`, which defines the number of maximum parallel connections when performing batch operations. If left unspecified, the default is 8 connections. For more information see the [Put Multiple Objects](#Put-Multiple-Objects) section in this notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Set path"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "All data in the platform is stored in user-defined data containers. In this case we use the predefined \"users\" container. For more information refer to [Data containers, collections, and objects documentation](https://www.iguazio.com/docs/latest-release/concepts/containers-collections-objects)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "CONTAINER = 'users'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Data path where to store the kv table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from os import getenv, path\n",
    "\n",
    "V3IO_USERNAME = getenv('V3IO_USERNAME')\n",
    "STREAM_PATH = path.join(V3IO_USERNAME, 'examples', 'v3io', 'stream')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Creates and configures a new stream. The new stream is available immediately upon its creation.\n",
    "\n",
    "You must configure the stream's shard count (the number of shards in the stream). You can increase the shard count at any time, but you cannot reduce it.\n",
    "\n",
    "You can also set the stream's retention period (default is 24 hours). After this period elapses, when new records are added to the stream, the earliest ingested records are deleted."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "204\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.create(container=CONTAINER,\n",
    "                                     stream_path=STREAM_PATH,\n",
    "                                     shard_count=8)\n",
    "print(response.status_code)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Put Records"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Adds records to a stream."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll define a function that will convert our text to lowercase words"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "146"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import re\n",
    "regex_non_lower_case = re.compile('[^a-z]')\n",
    "\n",
    "def text_to_words(text):\n",
    "    # split into words\n",
    "    words = text.split()\n",
    "    # convert to lower case\n",
    "    words = [w.lower() for w in words]\n",
    "    # Keep only letters\n",
    "    words = [regex_non_lower_case.sub('', w) for w in words]\n",
    "    \n",
    "    return words\n",
    "\n",
    "text1 = \"WOLF, meeting with a Lamb astray from the fold, resolved not to lay violent hands on him, but to find some plea to justify to the Lamb the Wolf’s right to eat him. He thus addressed him: “Sirrah, last year you grossly insulted me.” “Indeed,” bleated the Lamb in a mournful tone of voice, “I was not then born.” Then said the Wolf, “You feed in my pasture.” “No, good sir,” replied the Lamb, “I have not yet tasted grass.” Again said the Wolf, “You drink of my well.” “No,” exclaimed the Lamb, “I never yet drank water, for as yet my mother’s milk is both food and drink to me.” Upon which the Wolf seized him and ate him up, saying, “Well! I won’t remain supperless, even though you refute every one of my imputations.” The tyrant will always find a pretext for his tyranny.\"\n",
    "words1 = text_to_words(text1)\n",
    "len(words1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Convert the list of words to a record. A record is a list of dictionaries, where for each dictionary the `data` key contains the record data.\n",
    "We display the first 5 records:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[{'data': 'wolf'}, {'data': 'meeting'}, {'data': 'with'}, {'data': 'a'}, {'data': 'lamb'}]\n"
     ]
    }
   ],
   "source": [
    "records = [{'data': word} for word in words1]\n",
    "print(records[:5])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "200\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.put_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          records=records)\n",
    "print(response.status_code)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Write another set of records to the stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "text2 = \"A BAT who fell upon the ground and was caught by a Weasel pleaded to be spared his life. The Weasel refused, saying that he was by nature the enemy of all birds. The Bat assured him that he was not a bird, but a mouse, and thus was set free. Shortly afterwards the Bat again fell to the ground and was caught by another Weasel, whom he likewise entreated not to eat him. The Weasel said that he had a special hostility to mice. The Bat assured him that he was not a mouse, but a bat, and thus a second time escaped. It is wise to turn circumstances to good account.\"\n",
    "response = v3io_client.stream.put_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          records=[{'data': word} for word in text_to_words(text2)])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get Records"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Multiple consumer instances can consume data from the same stream. A consumer retrieves records from a specific shard. It is recommended that you distribute the data consumption among several consumer instances (“workers”), assigning each instance one or more shards.\n",
    "\n",
    "For each shard, the consumer should determine the location within the shard from which to begin consuming records. This can be the earliest ingested record, the end of the shard, the first ingested record starting at a specific time, or a specific record identified by its sequence number (a unique record identifier that is assigned by the platform based on the record’s ingestion time). The consumer first uses a seek operation to obtain the desired consumption location, and then provides this location as the starting point for its record consumption. The consumption operation returns the location of the next record to consume within the shard, and this location should be used as the location for a subsequent consumption operation, allowing for sequential record consumption."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "shard_id = 0\n",
    "seek_response = v3io_client.stream.seek(container=CONTAINER,\n",
    "                                   stream_path=STREAM_PATH,\n",
    "                                   shard_id=shard_id,\n",
    "                                   seek_type='EARLIEST')\n",
    "location = seek_response.output.location"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will read from the stream, by default `get_records` limits the number of records returned to 1,000. For the sake of this demonstration we will limit the number of returned records to 10 by setting the `limit` parameter."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['wolf', 'fold', 'him', 'to', 'him', 'you', 'in', 'not', 'feed', 'the']\n"
     ]
    }
   ],
   "source": [
    "get_response = v3io_client.stream.get_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          shard_id=0,\n",
    "                                          location=location,\n",
    "                                          limit=10)\n",
    "words_shard0 = [record.data.decode('utf-8') for record in get_response.output.records]\n",
    "print(words_shard0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['again', 'well', 'drank', 'is', 'which', 'up', 'though', 'the', 'his', 'ground']\n"
     ]
    }
   ],
   "source": [
    "location = get_response.output.next_location\n",
    "\n",
    "get_response = v3io_client.stream.get_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          shard_id=0,\n",
    "                                          location=location,\n",
    "                                          limit=10)\n",
    "words_shard0 = [record.data.decode('utf-8') for record in get_response.output.records]\n",
    "print(words_shard0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Describe Stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can retrieve a stream’s configuration, including the shard count and retention period."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "200\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.describe(container=CONTAINER,\n",
    "                                       stream_path=STREAM_PATH)\n",
    "print(response.status_code)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Shards: 8\n",
      "Retention period: 24 hours\n"
     ]
    }
   ],
   "source": [
    "print(f'Shards: {response.output.shard_count}\\nRetention period: {response.output.retention_period_hours} hours')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Update Stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Updates a stream's configuration by increasing its shard count. The changes are applied immediately.\n",
    "\n",
    "> **Note**: You can increase the shard count at any time, but you cannot reduce it. From the platform’s perspective, there is virtually no cost to creating even thousands of shards. However, if you increase a stream’s shard count after its creation, new records with a previously used partition key will be assigned either to the same shard that was previously used for this partition key or to a new shard. For more information see [stream sharding and partitioning](https://www.iguazio.com/docs/latest-release/concepts/streams/#stream-sharding-and-partitioning)\n",
    "\n",
    "> **Spark-Streaming Note**: To use the Spark Streaming API to consume records from new shards after a shard-count increase, you must first restart the consumer application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "204\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.update(container=CONTAINER,\n",
    "                                     stream_path=STREAM_PATH,\n",
    "                                     shard_count=10)\n",
    "print(response.status_code)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Describe the stream again to see the updated shard count"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "10\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.describe(container=CONTAINER,\n",
    "                                       stream_path=STREAM_PATH)\n",
    "print(response.output.shard_count)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Assigning Shard IDs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the prior section when we get the records, we didn't get all words. The reason is that, by default, the platform assigns records to shards using a Round Robin algorithm, and our consumer code reads from a single shard. If you would like to ensure a consumer gets all the words, you can optionally assign a record to specific stream shard by specifying a related shard ID by setting the `shard_id` value, or associate the record with a specific partition key to ensure that similar records are assigned to the same shard (by setting the `partition_key` value). For more information see [stream sharding and partitioning](https://www.iguazio.com/docs/latest-release/concepts/streams/#stream-sharding-and-partitioning)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's ignore the existing shards (shards 0 through 9) and increase the number of shards"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = v3io_client.stream.update(container=CONTAINER,\n",
    "                                     stream_path=STREAM_PATH,\n",
    "                                     shard_count=12)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Write the first text to shard 10"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = v3io_client.stream.put_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          records=[{'data': word, 'shard_id': 10} for word in words1])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And write the second text to shard 11"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = v3io_client.stream.put_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          records=[{'data': word, 'shard_id': 11} for word in text_to_words(text2)])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, read from shard 10"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['wolf', 'meeting', 'with', 'a', 'lamb', 'astray', 'from', 'the', 'fold', 'resolved']...\n",
      "All words appear as expected\n"
     ]
    }
   ],
   "source": [
    "shard_id = 10\n",
    "seek_response = v3io_client.stream.seek(container=CONTAINER,\n",
    "                                   stream_path=STREAM_PATH,\n",
    "                                   shard_id=shard_id,\n",
    "                                   seek_type='EARLIEST')\n",
    "location = seek_response.output.location\n",
    "\n",
    "get_response = v3io_client.stream.get_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          shard_id=shard_id,\n",
    "                                          location=location)\n",
    "words_shard10 = [record.data.decode('utf-8') for record in get_response.output.records]\n",
    "print(f'{words_shard10[:10]}...')\n",
    "if(words_shard10 == words1[:len(words_shard10)]):\n",
    "    print(\"All words appear as expected\")\n",
    "else:\n",
    "    print(\"Error, something is wrong\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Put Multiple Records"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To get the highest possible throughput, we can send many requests towards the data layer and wait for all the responses to arrive (rather than send each request and wait for the response). The SDK supports this through batching. Any API call can be made through the client's built in `batch` object. The API call receives the exact same arguments it would normally receive (except for `raise_for_status`), and does not block until the response arrives. To wait for all pending responses, call `wait()` on the `batch` object:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> **Note**: The number of parallel connections is determined by the `max_connections` parameter when you created the client. For instance, to set 16 parallel connections you should have in the beginning of the notebook `v3io_client = v3io.dataplane.Client(max_connections=16)`. The default is 8 connections."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`put_records` accepts up to a maximum of 1,000 records. If the records limit is exceeded, the request fails. Therefore, we would need to call `put_records` multiple times. In this example, we'll send large number of events. We will create a generator that returns a list of events. Each event has the following record:\n",
    "\n",
    "``` python\n",
    "'user': <user_id>\n",
    "'time': <event_time>\n",
    "'url': <url>\n",
    "```\n",
    "Each user will be selected at random, the URL will also be a random string and the generated events will have their event_time monotonically increasing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random\n",
    "import string\n",
    "from datetime import timedelta\n",
    "\n",
    "def generate_events(users, start_time, num_events, batch_size):\n",
    "    curr_time = start_time\n",
    "    \n",
    "    # letters to use for the URL generation \n",
    "    url_letters = string.ascii_lowercase + string.digits\n",
    "    \n",
    "    # Iterate through the number of events in batch_size increments\n",
    "    for i in range(0, num_events, batch_size):\n",
    "        events = []\n",
    "        for j in range(batch_size):\n",
    "            # choose a length to generate for the URL suffix\n",
    "            length = random.choice(range(20,30))\n",
    "            \n",
    "            # choose a user and the URL\n",
    "            event = {\n",
    "                'user': random.choice(users),\n",
    "                'time': curr_time,\n",
    "                'url': f\"https://example.com/{''.join(random.choice(url_letters) for i in range(length))}\"\n",
    "            }\n",
    "            \n",
    "            # increase the time for the next event randomly by a range of 0 to 10000 microseconds\n",
    "            curr_time += timedelta(microseconds=random.choice(range(0,10000)))\n",
    "            events.append(event)\n",
    "            \n",
    "        # yield returns the events as a generator, we will receive this in our loop\n",
    "        yield events"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Test the generator, print just a few events"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[{'user': 'user_3', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 69161), 'url': 'https://example.com/rpoig8f1cbfno6b9m80o2'}, {'user': 'user_0', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 78815), 'url': 'https://example.com/k1vrjnvgfygwwqc38hyf9sxm'}]\n",
      "[{'user': 'user_84', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 79954), 'url': 'https://example.com/osfogyr3xkxwnrek8pk3'}, {'user': 'user_81', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 86170), 'url': 'https://example.com/9oudocuzrenun5z3jqip98q1'}]\n"
     ]
    }
   ],
   "source": [
    "random.seed(42)\n",
    "from datetime import datetime\n",
    "\n",
    "for events in generate_events(users=[f'user_{index}' for index in range(0,100)],\n",
    "                              start_time=datetime(2020, 10, 7, 5, 53, 0, 69161),\n",
    "                              num_events=4,\n",
    "                              batch_size=2):\n",
    "    print(events)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Seek the latest position in one of the shards, in order to later retrieve the new data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "shard_id = 0\n",
    "seek_response = v3io_client.stream.seek(container=CONTAINER,\n",
    "                                   stream_path=STREAM_PATH,\n",
    "                                   shard_id=shard_id,\n",
    "                                   seek_type='LATEST')\n",
    "location = seek_response.output.location"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "num_users = 10000\n",
    "num_events = 100000\n",
    "batch_size = 1000\n",
    "\n",
    "for events in  generate_events(users=[f'user_{index}' for index in range(num_users)],\n",
    "                               start_time=datetime.now(),\n",
    "                               num_events=num_events,\n",
    "                               batch_size=batch_size):\n",
    "    records = [{'data': json.dumps(event, default=str), 'partition_key': event.get('user')} for event in events]\n",
    "    \n",
    "    v3io_client.batch.stream.put_records(container=CONTAINER,\n",
    "                                         stream_path=STREAM_PATH,\n",
    "                                         records=records)\n",
    "\n",
    "# wait for all writes to complete\n",
    "responses = v3io_client.batch.wait()    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The looped `put_records` interface above will send all `put records` requests to the data layer in parallel. When `wait` is called, it will block until either all responses arrive (in which case it will return a `Responses` object, containing the `responses` of each call) or an error occurs - in which case an exception is thrown. You can pass `raise_for_status` to `wait`, and it behaves as explained above.\n",
    "\n",
    "> Note: The `batch` object is stateful, so you can only create one batch at a time. However, you can create multiple parallel batches yourself through the client's `create_batch()` interface"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Test the received record from the previously obtained location"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Received 1000 records\n",
      "First record: {'user': 'user_7491', 'time': '2020-10-12 08:27:58.149959', 'url': 'https://example.com/uea3ge8n6qiwepxsk28t7a9t'}\n"
     ]
    }
   ],
   "source": [
    "get_response = v3io_client.stream.get_records(container=CONTAINER,\n",
    "                                          stream_path=STREAM_PATH,\n",
    "                                          shard_id=shard_id,\n",
    "                                          location=location)\n",
    "records = get_response.output.records\n",
    "\n",
    "print(f'Received {len(records)} records')\n",
    "print(f'First record: {json.loads(get_response.output.records[0].data)}')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Delete Stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Deletes a stream object along with all of its shards."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "204\n"
     ]
    }
   ],
   "source": [
    "response = v3io_client.stream.delete(container=CONTAINER, stream_path=STREAM_PATH)\n",
    "print(response.status_code)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternatively you can use the following commands:\n",
    "``` python\n",
    "import shutil\n",
    "V3IO_STREAM_PATH = path.join(sep, 'v3io', CONTAINER, STREAM_PATH)\n",
    "shutil.rmtree(V3IO_STREAM_PATH)\n",
    "```\n",
    "\n",
    "or\n",
    "\n",
    "```\n",
    "!rm -r $V3IO_STREAM_PATH\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
